/**
 * FileName: ProcessCfgUtil
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/13 9:45
 * Description: 处理process
 */
package cn.com.bonc.util;

import org.apache.spark.sql.Column;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.window;

public class ProcessCfgUtil {

    private static Properties prop = new Properties();

    static {
        try {
            InputStream in = ProcessCfgUtil.class
                    .getClassLoader().getResourceAsStream("process.properties");
            prop.load(in);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 获取properties文件中spark.sql.*属性
     * 需要注意的是，由于采用字符串默认排序，为了保证sql顺序
     * 当数量不超过十个应该 spark.sql.1表示第一个
     * 总是以最大的位数来表示序号，若存在12条 spark.sql.01表示第一个
     * @return 将属性以集合形式返回
     */
    public static List<String> getSqlList(){
        ArrayList<String> resultList = new ArrayList<>();
        prop.stringPropertyNames()
                .parallelStream()
                .filter(p->Pattern.compile("^spark.sql.[0-9]\\d*$").matcher(p).matches())
                .sorted()
                .collect(Collectors.toList())
                .forEach(k->resultList.add(prop.getProperty(k)));
        return resultList;
    }

    /**
     * 获取要存储的列
     * @return 返回Colum数组，当属性不存在或为空时返回 null
     */
    public static Column[] getSinkCols(){
        if (isPropertyNullOrEmpty("spark.sql.sink.cols")){
            return null;
        }
        String[] properties = prop.getProperty("spark.sql.sink.cols").split(",");
        return getCols(properties);
    }

    /**
     * 将列名数组转换为列数组
     * @param cols 列名数组
     * @return 列数组
     */
    public static Column[] getCols(String[] cols) {
        Column[] columns = new Column[cols.length];
        int index=0;
        for (String prop :cols){
            columns[index++]=col(prop).cast("string").as(prop);
        }
        return columns;
    }

    /**
     * 判断是否进行窗口聚合
     * @return 当配置属性存在且不为空的情况下返回 true，反之返回false
     */
    public static boolean isWindowAggregate() {
        return !isPropertyNullOrEmpty("spark.window.duration");
    }

    /**
     * 从文件中获取窗口的持续时间
     * @return 时长字符串
     */
    public static String getWindowDuration() {
        return prop.getProperty("spark.window.duration");
    }

    /**
     * 获取带有时间戳的列数组
     * @return 列数组
     */
    public static Column[] getWindowColsWithTimestamp(){
        String[] properties = prop.getProperty("spark.window.cols").split(",");
        Column[] cols = getCols(properties);
        Column[] columns = new Column[cols.length + 1];
        Column timestamp = window(col("timestamp"),getWindowDuration());
        columns[0]=timestamp;
        System.arraycopy(cols, 0,columns, 1, cols.length);
        return columns;
    }

    /**
     * 判断是否读取外部数据，例如redis
     * @return 当配置属性存在且不为空的情况下返回 true，反之返回false
     */
    public static boolean isJoin(){
        return !isPropertyNullOrEmpty("spark.sql.join.name");
    }

    /**
     * 获取关联的外部数据来源名称
     * @return 全大写名称 redis  -> REDIS
     */
    public static String getJoinCapitalName(){
        if (isJoin()){
            return prop.getProperty("spark.sql.join.name").toUpperCase();
        }
        return "";
    }
    /**
     * 从文件获取连接的列名用于
     * Dataset.join(anotherDataset, getJoinExpr())
     * @return 返回列
     */
    public static Column getJoinExpr() {
        return expr(prop.getProperty("spark.sql.join.expr"));
    }

    /**
     * 私有方法，判断属性是否存在或为空
     * @param propName 属性名
     * @return 属性不存在或为空的情况下返回 true，否则返回false
     */
    private static boolean isPropertyNullOrEmpty(String propName) {
        String property = prop.getProperty(propName);
        if (property==null||property.trim().equals("")){
            return true;
        }
        return false;
    }

}
